How To Replicate A Reader In Go
在使用 Go 编程时,有时我们会需要多次消费某个 io.Reader
的内容(注意:是同一内容读取多次,而不同于通常的多次分段读取),这就需要我们可以“复制”一个 io.Reader
。
而且某些场景下不太方便将 io.Reader
内容一次性读出,比如将 net.Conn
视为 io.Reader
使用或读取超大文件无法全部加载到内存等情形。
所以在这里我们还是讨论 io.Reader
本身的复制。
对于最简单的消费两次的场景,网上最常见的推荐的写法基本是这样的:
func ReplicateReader(r io.Reader) (r1 io.Reader, r2 io.Reader) { buf := bytes.Buffer{} teeR := io.TeeReader(r, &buf) return teeR, &buf }
很多人认为这体现了 Go 简洁高效的特点。
某种程度来说,的确如此,如果使用场景是:仅有两个 io.Reader
的使用者,且至少保证有某一个先读的使用者会将内容读取完毕(直至 io.EOF
)。
首先,为什么这个写法对于超过两个 io.Reader
的使用者就不简洁高效了?
我们这里假定有 n 个使用者 (n > 2)。
一是因为需要 n - 1 次的调用,当然这点可以很简单地被更好的 ReplicateReader()
的定义解决,换为 func(r io.Reader, n int) []io.Reader
即可。
二则是因为这种写法为每一个 io.TeeReader()
都要准备单独的 buffer ,既然有 n - 1 次 io.TeeReader()
的调用当然也就需要有 n - 1 个独立的 buffer 了,这在内存上就不够高效了(毕竟存储的都是来自同一 io.Reader
的内容)。
其次为什么需要保证有某一个先读的使用者将内容读取完毕呢?
因为 io.TeeReader()
其实是有“主动”、“从动”之分的,以之前这个 ReplicateReader()
写法为例,这里返回的 r2
(buf
) 的内容是来自于 r1
(teeR
) 读取过的。
这就要求这个“主动”的 io.Reader
的使用者必须要先行读取内容,不然别人就没内容可读;另外它还得读取完所有内容,因为别人无法读到比它更多的内容。
对于 n 个使用者,当然我们可以优化到仅要求其中 1 个使用者来使用返回的“主动”的这个 io.Reader
剩下的使用者则使用返回的其它“从动”的 io.Reader
即可,甚至于为了 ReplicateReader()
调用者方便还可以将这个“主动”的使用者内置不予返回而返回额外的 n 个“从动”的 io.Reader
由 ReplicateReader
来确保这个“主动”使用者的行为。
这样要么对使用场景有要求,要么就不那么高效(多出一个隐藏的使用者)。
想要实现一个更高效的 ReplicateReader()
可以采用以下做法:
- 返回的每个
io.Reader
独自维护自己的offset
但共用一个buffer
- 若
buffer
还有当前io.Reader
未读取的内容 (offset < len(buffer)
) 则(仅)从buffer
剩余内容读取 - 若
buffer
中内容目前已全被当前io.Reader
读取 (offset == len(buffer)
) ,则从原始的io.Reader
中读取并将内容追加到buffer
中,如遇io.EOF
则对buffer
标记 EOF (将 EOF 视为一种特殊的从原始io.Reader
读取到的内容)
在这个最简单的策略下我们可以保证这些 ReplicateReader()
返回的 io.Reader
的内容的正确性以及完整性(不论它们的使用者以何种顺序如何读取),并且使用 1 个公共的 buffer
+ n 个 offset
代替了原来的 n 个独立的 buffer 。
当然就目前而言,这个 buffer
如同貔貅一样只增不减,我们可以进行更进一步的优化。
简单想想就能够意识到其实 buffer
仅需要维护读的最靠前和最靠后的两个 io.Reader
之间的内容即可,并且更进一步的,因为最靠后的使用者读到的内容正是当前 buffer
中的所有内容所以我们也可以认为 buffer
需要维护的是最靠前的 io.Reader
之后的内容。
但一旦想到这里,另一个问题也就浮上了水面:如果有某个使用者提前退出了,那么它的 io.Reader
的 offset 就不会再更新了,因而我们也要为这些使用者设计一种退出机制。
先对 ReplicateReader()
的定义做一下优化,将它定义为 func(r io.Reader, n int) []io.ReadCloser
,这里返回的 io.ReadCloser
的 Close()
是用来标记它提前结束使用的,不过按照 Go 惯常的使用方式读取完毕到 io.EOF
后再调用 Close()
也是兼容的。
既然为使用者引入了退出机制,就可以接着引入其它新的优化策略了:
- 引入优先级队列来管理各个
offset
,采用小顶堆,因为我们关心的是最靠前的offset
- 返回的
io.ReadCloser
的Close()
调用时会将当前使用者从优先级队列中移出 - 为
buffer
引入base
并修改规则 2. 和 3. 的比对条件,如果最靠前的offset
在buffer
中对应位置已经在后半部分则将buffer
中的内容前移并更新base
- 如果仅剩一个使用者,当它从原始的
io.Reader
中读取时,就不必再将读到的内容拷贝至buffer
了
我按照这个策略实现了一版 ReplicateReader()
放在 repreader.go ,欢迎取用。
P.S. io.Writer
的文档中要求如果没能一次性写入就必须返回一个错误(也就是说没错误的话必须写完传入的内容),不过幸好 io.Reader
没这要求不然读取操作的实现还得更复杂些。
Write must return a non-nil error if it returns n < len(p).